#include <Formats/FormatSchemaInfo.h>
#include <Interpreters/Context.h>
#include <Common/Exception.h>
#include <Common/filesystemHelpers.h>
#include <Disks/IO/WriteBufferFromTemporaryFile.h>
#include <filesystem>


namespace DB
{
namespace ErrorCodes
{
    extern const int BAD_ARGUMENTS;
}

namespace fs = std::filesystem;

namespace
{
    String getFormatSchemaDefaultFileExtension(const String & format)
    {
        if (format == "Protobuf")
            return "proto";
        else if (format == "CapnProto")
            return "capnp";
        else
            return "";
    }
}


FormatSchemaInfo::FormatSchemaInfo(const String & format_schema, const String & format, bool require_message, bool is_server, const std::string & format_schema_path)
{
    if (format_schema.empty())
        throw Exception(ErrorCodes::BAD_ARGUMENTS, "The format {} requires a schema. The corresponding setting should be set", format);

    String default_file_extension = getFormatSchemaDefaultFileExtension(format);

    fs::path path;
    if (require_message)
    {
        size_t colon_pos = format_schema.find(':');
        if ((colon_pos == String::npos) || (colon_pos == 0) || (colon_pos == format_schema.length() - 1))
        {
            throw Exception(ErrorCodes::BAD_ARGUMENTS,
                    "Format schema requires the 'format_schema' setting to have the 'schema_file:message_name' format{}. Got '{}'",
                    (default_file_extension.empty() ? "" : ", e.g. 'schema." + default_file_extension + ":Message'"), format_schema);
        }
        else
        {
            path = fs::path(format_schema.substr(0, colon_pos));
            String filename = path.has_filename() ? path.filename() : path.parent_path().filename();
            if (filename.empty())
                throw Exception(ErrorCodes::BAD_ARGUMENTS,
                    "Format schema requires the 'format_schema' setting to have the 'schema_file:message_name' format{}. Got '{}'",
                    (default_file_extension.empty() ? "" : ", e.g. 'schema." + default_file_extension + ":Message'"), format_schema);
        }
        message_name = format_schema.substr(colon_pos + 1);
    }
    else
    {
        path = fs::path(format_schema);
        if (!path.has_filename())
            path = path.parent_path() / "";
    }

    auto default_schema_directory = [&format_schema_path]()
    {
        static const String str = fs::canonical(format_schema_path) / "";
        return str;
    };

    if (!path.has_extension() && !default_file_extension.empty())
        path = path.parent_path() / (path.stem().string() + '.' + default_file_extension);

    fs::path default_schema_directory_path(default_schema_directory());
    if (path.is_absolute())
    {
        if (is_server)
            throw Exception(ErrorCodes::BAD_ARGUMENTS, "Absolute path in the 'format_schema' setting is prohibited: {}", path.string());
        schema_path = path.filename();
        schema_directory = path.parent_path() / "";
    }
    else if (path.has_parent_path() && !fs::weakly_canonical(default_schema_directory_path / path).string().starts_with(fs::weakly_canonical(default_schema_directory_path).string()))
    {
        if (is_server)
            throw Exception(
                ErrorCodes::BAD_ARGUMENTS,
                "Path in the 'format_schema' setting shouldn't go outside the 'format_schema_path' directory: {} ({} not in {})",
                default_schema_directory(),
                path.string(),
                default_schema_directory());
        path = default_schema_directory_path / path;
        schema_path = path.filename();
        schema_directory = path.parent_path() / "";
    }
    else
    {
        schema_path = path;
        schema_directory = default_schema_directory();
    }
}

FormatSchemaInfo::FormatSchemaInfo(const FormatSettings & settings, const String & format, bool require_message)
    : FormatSchemaInfo(
        settings.schema.format_schema, format, require_message, settings.schema.is_server, settings.schema.format_schema_path)
{
}

template <typename SchemaGenerator>
MaybeAutogeneratedFormatSchemaInfo<SchemaGenerator>::MaybeAutogeneratedFormatSchemaInfo(
    const FormatSettings & settings, const String & format, const Block & header, bool use_autogenerated_schema)
{
    if (!use_autogenerated_schema || !settings.schema.format_schema.empty())
    {
        schema_info = std::make_unique<FormatSchemaInfo>(settings, format, true);
        return;
    }

    String schema_path;
    fs::path default_schema_directory_path(fs::canonical(settings.schema.format_schema_path) / "");
    fs::path path;
    if (!settings.schema.output_format_schema.empty())
    {
        schema_path = settings.schema.output_format_schema;
        path = schema_path;
        if (path.is_absolute())
        {
            if (settings.schema.is_server)
                throw Exception(ErrorCodes::BAD_ARGUMENTS, "Absolute path in the 'output_format_schema' setting is prohibited: {}", path.string());
        }
        else if (path.has_parent_path() && !fs::weakly_canonical(default_schema_directory_path / path).string().starts_with(fs::weakly_canonical(default_schema_directory_path).string()))
        {
            if (settings.schema.is_server)
                throw Exception(
                    ErrorCodes::BAD_ARGUMENTS,
                    "Path in the 'format_schema' setting shouldn't go outside the 'format_schema_path' directory: {} ({} not in {})",
                    default_schema_directory_path.string(),
                    path.string(),
                    default_schema_directory_path.string());
            path = default_schema_directory_path / path;
        }
        else
        {
            path = default_schema_directory_path / path;
        }
    }
    else
    {
        if (settings.schema.is_server)
        {
            tmp_file_path = PocoTemporaryFile::tempName(default_schema_directory_path.string()) + '.' + getFormatSchemaDefaultFileExtension(format);
            schema_path = fs::path(tmp_file_path).filename();
        }
        else
        {
            tmp_file_path = PocoTemporaryFile::tempName() + '.' + getFormatSchemaDefaultFileExtension(format);
            schema_path = tmp_file_path;
        }

        path = tmp_file_path;
    }

    WriteBufferFromFile buf(path.string());
    SchemaGenerator::writeSchema(buf, "Message", header.getNamesAndTypesList());
    buf.finalize();

    schema_info = std::make_unique<FormatSchemaInfo>(schema_path + ":Message", format, true, settings.schema.is_server, settings.schema.format_schema_path);
}

template <typename SchemaGenerator>
MaybeAutogeneratedFormatSchemaInfo<SchemaGenerator>::~MaybeAutogeneratedFormatSchemaInfo()
{
    if (!tmp_file_path.empty())
    {
        try
        {
            fs::remove(tmp_file_path);
        }
        catch (...)
        {
            tryLogCurrentException("MaybeAutogeneratedFormatSchemaInfo", "Cannot delete temporary schema file");
        }
    }
}

template class MaybeAutogeneratedFormatSchemaInfo<StructureToCapnProtoSchema>;
template class MaybeAutogeneratedFormatSchemaInfo<StructureToProtobufSchema>;

}
